Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Allowing named worker-setups #2391

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

rainwoodman
Copy link
Contributor

  1. Convert worker-setups to a dictionary.

  2. Coroutines in scheduler takes a callbacks dict as input,
    where functions are serialized as strings/bytes.

  3. When the name already exists,
    the newly registered setup function will immediately be run
    on all existing clients, but new clients will only see
    the updated version of the setup function.

1. Convert worker-setups to a dictionary.

2. Coroutines in scheduler takes a callbacks dict as input,
   where functions are serialized as strings/bytes.

3. When the name already exists,
   the newly registered setup function will immediately be run
   on all existing clients, but new clients will only see
   the updated version of the setup function.
@rainwoodman
Copy link
Contributor Author

This is a WIP.

I am having difficulty running the tests. pytest gives me strange errors:

  $ pytest distributed

  .....
    action = super(_ArgumentGroup, self)._add_action(action)
  File "/home/yfeng1/anaconda3/install/lib/python3.6/argparse.py", line 1366, in _add_action
    self._check_conflict(action)
  File "/home/yfeng1/anaconda3/install/lib/python3.6/argparse.py", line 1505, in _check_conflict
    conflict_handler(action, confl_optionals)
  File "/home/yfeng1/anaconda3/install/lib/python3.6/argparse.py", line 1514, in _handle_conflict_error
    raise ArgumentError(action, message % conflict_string)
argparse.ArgumentError: argument -L/--leaks: conflicting option string: --leaks

@mrocklin
Copy link
Member

mrocklin commented Dec 2, 2018

Unfortunately I'm not familiar with that error message. I recommend trying from a clean environment or making sure that you don't have other configuration lying around.

I know that this is a WIP, but one thought on quick review is that it would be good to support backwards compatibility for the Client API. In particular we might place the name paramter after setup and have it default to distributed.utils.funcname(setup) or something similar.

@rainwoodman
Copy link
Contributor Author

  • you mean running pytest in the code directory is supposed to work.

  • I was thinking of an an API breakage for two reasons:

If we use an optional name it shall probably be the fully qualified function name to avoid accidental conflicts. In many cases it is impossible to uniquely define a fully qualified function name of the callback function, and therefore callers are forced to provide a name function anyways.

Also problematic is the the planned inclusion of other callback functions; when that happens, which function name do we use?

@guillaumeeb
Copy link
Member

I'm also against API breakage. Accidental conflicts could only come from several users using the same cluster, in this case it shall be clearly documented to use a name or label for the callback.

Also problematic is the the planned inclusion of other callback functions; when that happens, which function name do we use?

I'm not sure I get this, we should record one callback per function, so one function name per callback?

@rainwoodman
Copy link
Contributor Author

OK. Let me figure what I can do.

assert len(s.worker_setups) == 2

# Check it has been run
result = yield c.run(test_startup2)
assert list(result.values()) == [True] * 2

# unregister a preload function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed 'register_worker_callbacks' to return the registered names to allow unregisterring callback functions with inferred names.

self.batched_stream = BatchedSend(interval='2ms', loop=self.loop)
self.batched_stream.start(comm)
self.periodic_callbacks['heartbeat'].start()
self.loop.add_callback(self.handle_scheduler, comm)

# run eventual init functions (only worker-setups for now)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to run the worker init functions after heartbeat; also I am using the locak worker_setups directionary rather than the unpickled function directly.

""" Registers a setup function, and call it on every worker """
if setup is None:
raise gen.Return({})
def register_worker_callbacks(self, comm, callbacks, names):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved the Client API, but changed the scheduler API in a in compatible way:

This is to avoid

register_worker_callbacks(self, comm, setup, setup_name, teardown, teardown_name):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem, but I'm not entirely convinced by the proposed solution. I've not much to propose though...

Are you expecting that we register more than a setup and teardown callbacks at once in the future?

Could we use tuples instead of dict containing several callbacks? e.g stick to

register_worker_callbacks(self, comm, setup=None, teardown=None):

But where we expect setup or teardown to be (name, function)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a tuple would work. We can actually extend this all the way to the worker: If a tuple is given then the first argument is name. If not a tuple then we generated a name from the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rework the PR along this line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not responding to this earlier. I personally prefer asking for the name= keyword explicitly rather than passing it through as a tuple.

client.register_worker_callbacks(setup=setup_database, teardown=teardown_database, name='database')

I personally find passing tuples somewhat awkward.

@guillaumeeb what was your concern with the name= keyword solution? That is it hard to provide ordered parameter compatibility going forward?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the tuple because:

  • each callback needs a different name.
  • it was decided register_worker_callbacks can register multiple callbacks in one call.
  • there is no clean way to assign names to callbacks with a single name argument and multiple callbacks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was not about the name= keyword solution, but about the callbacks=dict solution, which I find kind of obscure even if documented. The tuple may not be better though.

I think I prefer explicit register_worker_callbacks(self, comm, setup, setup_name, teardown, teardown_name) as the one with only a dict of callbacks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There wouldn't be a problem if the function was instead:

register_worker_callback(self, comm, callback_type, function , name)

I doubt there is need to make it atomic registering several callbacks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another possibility.

register_worker_callbacks(self, comm, setup, name=None)

If setup takes two arguments setup(worker, event), then we will call it as
setup(worker, 'setup')
setup(worker, 'teardown').

If setup takes one or zero argument, we only call it during setup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe when we add teardowns we decide that we need classes instead?

So today

def register_worker_callbacks(self, comm, setup, name=None):

Where setup is a callable.

In the future

def register_worker_callbacks(self, comm, cb, name=None):

Where cb is either a callable, in which case it is interpretted as a setup function, or it is a class with the following structure:

class MyWorkerCallback:
    def __init__(self, worker):
        ...
    def setup(self):
        ...
    def tearDown(self):
        ...
    ...

In this way we continue to maintain the same structure today, provide a path towards supporting teardowns in the future, but don't have to rely on implicit tuple semantics.

callbacks['setup'] = serialized
if name is None:
h = md5(serialized)
name = funcname(setup) + '-' + h.hexdigest()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This achieves the hashing based on function "body" @mrocklin mentioned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little overkill to me to do a hash of serialized function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speed is not a concern here since it is only intended to be used at initialization.

The hash produces two names for lib1.setup and lib2.setup; even when a name is not explicitly given. I would preferred __qualname__, but it is only defined in Python 3.3+.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using dask.base.tokenize generally for hashing objects. We tend to centralize our logic there.

@rainwoodman
Copy link
Contributor Author

Travis passed. (The failure is unrelated).

This is ready for a review.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it looks good, a few minor comment/interrogations. I'm not convinced by the callbacks kwarg.

callbacks['setup'] = serialized
if name is None:
h = md5(serialized)
name = funcname(setup) + '-' + h.hexdigest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little overkill to me to do a hash of serialized function.

raise gen.Return(result)

def unregister_worker_callbacks(self, setup=None):
""" """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring here.

Are we sure we want an unregistering function? What about workers where the callback has already been called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same doubt too!

Then I realized this is useful if one then call client.restart() to restart all workers.

""" Registers a setup function, and call it on every worker """
if setup is None:
raise gen.Return({})
def register_worker_callbacks(self, comm, callbacks, names):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem, but I'm not entirely convinced by the proposed solution. I've not much to propose though...

Are you expecting that we register more than a setup and teardown callbacks at once in the future?

Could we use tuples instead of dict containing several callbacks? e.g stick to

register_worker_callbacks(self, comm, setup=None, teardown=None):

But where we expect setup or teardown to be (name, function)?

if setup = func is given, generate a name.
There is no longer a guarentee the setup function is always
ran together with the register call:

If the same function is registered twice it will not run the second
time.
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small style suggestions. I haven't looked at the full design yet though.

@@ -22,6 +22,7 @@
import socket
import warnings
import weakref
from hashlib import md5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to use dask.base.tokenize for this (though what you've done here is fine)

The setup callback to remove; it will be matched by the name.
If a name is not given, then it is generated from the callable.
"""
return self.sync(self._unregister_worker_callbacks, setup=setup)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are simple enough that you can merge them. See the definition of Client.cancel is a good example. You can use self.sync(self.scheudler.unregister_worker_callbacks, ...) directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client.cancel still goes through Client._cancel. I had the impression it was convention to always have two interfaces of the same functionality, the coroutine version and the non-coroutine version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. Perhaps list_datasets then.

We need to create a coroutine if there is a yield somewhere. If not, then we're happy to skip it for simplicity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I changed it.

""" Registers a set of event driven callback functions on workers for the given name.

setup must be a tuple of (name, serialized_function)
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For spacing I recommend the following:

"""
Registers a set of event driven callback functions on workers for the given name  # <<--- move this down to avoid the slightly longer line

setup must be a tuple of (name, serialized_function)  # <<<--- move this left to the same level as """
"""

name, func = setup
self.worker_setups.pop(name)

raise gen.Return(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a coroutine. I recommend removing the @gen.coroutine line and replacing raise gen.Return(...) with return ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the reason here is that this method never yields, so it doesn't need to be a coroutine

@mrocklin
Copy link
Member

mrocklin commented Dec 9, 2018

I apologize for the delay in review here. I've been travelling for work a bit more than is ideal.

@rainwoodman
Copy link
Contributor Author

Is there a way to sync a coroutine in the tests? (like self.sync). I am talking about this

@gen_cluster(client=True)
def test_register_worker_callbacks(c, s, a, b):
    
    ....

    def assert_mystartup1_called():
          r = ???.sync(client.run, lambda worker: hasattr(worker, 'mystartup1_value')
          return r.values()

    ....
    if not all(assert_mystartup1_called()): raise AssertionError(xxxxxx)

The test case can be hugely simplified if I can do this.

@mrocklin
Copy link
Member

Is there a way to sync a coroutine in the tests?

The client.run method magically works as a coroutine if in a coroutine. So you should be able to do the following:

r = yield client.run(...)

@rainwoodman
Copy link
Contributor Author

It won't work if I put yield client.run inside a nested function defined in the test case.
I do not want to define the nested function as a coroutine.

Base automatically changed from master to main March 8, 2021 19:03
@rainwoodman rainwoodman requested a review from fjetter as a code owner January 23, 2024 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants